/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.coders;



import com.bff.gaia.unified.sdk.annotations.Experimental;

import com.bff.gaia.unified.sdk.schemas.Schema;

import com.bff.gaia.unified.sdk.schemas.Schema.Field;

import com.bff.gaia.unified.sdk.schemas.Schema.FieldType;

import com.bff.gaia.unified.sdk.schemas.Schema.TypeName;

import com.bff.gaia.unified.sdk.util.SerializableUtils;

import com.bff.gaia.unified.sdk.values.Row;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.ImmutableMap;



import javax.annotation.Nullable;

import java.io.IOException;

import java.io.InputStream;

import java.io.OutputStream;

import java.util.List;

import java.util.Map;

import java.util.UUID;

import java.util.stream.Collectors;



import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkArgument;



/** A {@link Coder} for {@link Row}. It wraps the {@link Coder} for each element directly. */

@Experimental

public class RowCoder extends CustomCoder<Row> {

  // This contains a map of primitive types to their coders.

  static final ImmutableMap<TypeName, Coder> CODER_MAP =

      ImmutableMap.<TypeName, Coder>builder()

          .put(TypeName.BYTE, ByteCoder.of())

          .put(TypeName.BYTES, ByteArrayCoder.of())

          .put(TypeName.INT16, BigEndianShortCoder.of())

          .put(TypeName.INT32, VarIntCoder.of())

          .put(TypeName.INT64, VarLongCoder.of())

          .put(TypeName.DECIMAL, BigDecimalCoder.of())

          .put(TypeName.FLOAT, FloatCoder.of())

          .put(TypeName.DOUBLE, DoubleCoder.of())

          .put(TypeName.STRING, StringUtf8Coder.of())

          .put(TypeName.DATETIME, InstantCoder.of())

          .put(TypeName.BOOLEAN, BooleanCoder.of())

          .build();



  private static final ImmutableMap<TypeName, Integer> ESTIMATED_FIELD_SIZES =

      ImmutableMap.<TypeName, Integer>builder()

          .put(TypeName.BYTE, Byte.BYTES)

          .put(TypeName.INT16, Short.BYTES)

          .put(TypeName.INT32, Integer.BYTES)

          .put(TypeName.INT64, Long.BYTES)

          .put(TypeName.FLOAT, Float.BYTES)

          .put(TypeName.DOUBLE, Double.BYTES)

          .put(TypeName.DECIMAL, 32)

          .put(TypeName.BOOLEAN, 1)

          .put(TypeName.DATETIME, Long.BYTES)

          .build();



  private final Schema schema;



  public UUID getId() {

    return id;

  }



  private final UUID id;

  @Nullable

  private transient Coder<Row> delegateCoder = null;



  public static RowCoder of(Schema schema) {

    UUID id = (schema.getUUID() == null) ? UUID.randomUUID() : schema.getUUID();

    return new RowCoder(schema, id);

  }



  private RowCoder(Schema schema, UUID id) {

    if (schema.getUUID() != null) {

      checkArgument(

          schema.getUUID().equals(id),

          "Schema has a UUID that doesn't match argument to constructor. %s v.s. %s",

          schema.getUUID(),

          id);

    } else {

      // Clone the schema before modifying the Java object.

      schema = SerializableUtils.clone(schema);

      setSchemaIds(schema, id);

    }

    this.schema = schema;

    this.id = id;

  }



  // Sets the schema id, and then recursively ensures that all schemas have ids set.

  private void setSchemaIds(Schema schema, UUID id) {

    if (schema.getUUID() == null) {

      schema.setUUID(id);

    }

    for (Field field : schema.getFields()) {

      setSchemaIds(field.getType());

    }

  }



  private void setSchemaIds(FieldType fieldType) {

    switch (fieldType.getTypeName()) {

      case ROW:

        setSchemaIds(fieldType.getRowSchema(), UUID.randomUUID());

        return;

      case MAP:

        setSchemaIds(fieldType.getMapKeyType());

        setSchemaIds(fieldType.getMapValueType());

        return;

      case LOGICAL_TYPE:

        setSchemaIds(fieldType.getLogicalType().getBaseType());

        return;



      case ARRAY:

        setSchemaIds(fieldType.getCollectionElementType());

        return;



      default:

        return;

    }

  }



  // Return the generated coder class for this schema.

  private Coder<Row> getDelegateCoder() {

    if (delegateCoder == null) {

      // RowCoderGenerator caches based on id, so if a new instance of this RowCoder is

      // deserialized, we don't need to run ByteBuddy again to construct the class.

      delegateCoder = RowCoderGenerator.generate(schema);

    }

    return delegateCoder;

  }



  @Override

  public void encode(Row value, OutputStream outStream) throws IOException {

    getDelegateCoder().encode(value, outStream);

  }



  @Override

  public Row decode(InputStream inStream) throws IOException {

    return getDelegateCoder().decode(inStream);

  }



  public Schema getSchema() {

    return schema;

  }



  @Override

  public void verifyDeterministic()

      throws Coder.NonDeterministicException {

    verifyDeterministic(schema);

  }



  private void verifyDeterministic(Schema schema)

      throws Coder.NonDeterministicException {



    List<Coder<?>> coders =

        schema.getFields().stream()

            .map(Field::getType)

            .map(RowCoder::coderForFieldType)

            .collect(Collectors.toList());



    verifyDeterministic(this, "All fields must have deterministic encoding", coders);

  }



  @Override

  public boolean consistentWithEquals() {

    return true;

  }



  /** Returns the coder used for a given primitive type. */

  public static <T> Coder<T> coderForFieldType(FieldType fieldType) {

    switch (fieldType.getTypeName()) {

      case ROW:

        return (Coder<T>) RowCoder.of(fieldType.getRowSchema());

      case ARRAY:

        return (Coder<T>) ListCoder.of(coderForFieldType(fieldType.getCollectionElementType()));

      case MAP:

        return (Coder<T>)

            MapCoder.of(

                coderForFieldType(fieldType.getMapKeyType()),

                coderForFieldType(fieldType.getMapValueType()));

      default:

        return (Coder<T>) CODER_MAP.get(fieldType.getTypeName());

    }

  }



  /** Return the estimated serialized size of a give row object. */

  public static long estimatedSizeBytes(Row row) {

    Schema schema = row.getSchema();

    int fieldCount = schema.getFieldCount();

    int bitmapSize = (((fieldCount - 1) >> 6) + 1) * 8;



    int fieldsSize = 0;

    for (int i = 0; i < schema.getFieldCount(); ++i) {

      fieldsSize += (int) estimatedSizeBytes(schema.getField(i).getType(), row.getValue(i));

    }

    return (long) bitmapSize + fieldsSize;

  }



  private static long estimatedSizeBytes(FieldType typeDescriptor, Object value) {

    switch (typeDescriptor.getTypeName()) {

      case LOGICAL_TYPE:

        return estimatedSizeBytes(typeDescriptor.getLogicalType().getBaseType(), value);

      case ROW:

        return estimatedSizeBytes((Row) value);

      case ARRAY:

        List list = (List) value;

        long listSizeBytes = 0;

        for (Object elem : list) {

          listSizeBytes += estimatedSizeBytes(typeDescriptor.getCollectionElementType(), elem);

        }

        return 4 + listSizeBytes;

      case BYTES:

        byte[] bytes = (byte[]) value;

        return 4L + bytes.length;

      case MAP:

        Map<Object, Object> map = (Map<Object, Object>) value;

        long mapSizeBytes = 0;

        for (Map.Entry<Object, Object> elem : map.entrySet()) {

          mapSizeBytes +=

              typeDescriptor.getMapKeyType().getTypeName().equals(TypeName.STRING)

                  ? ((String) elem.getKey()).length()

                  : ESTIMATED_FIELD_SIZES.get(typeDescriptor.getMapKeyType().getTypeName());

          mapSizeBytes += estimatedSizeBytes(typeDescriptor.getMapValueType(), elem.getValue());

        }

        return 4 + mapSizeBytes;

      case STRING:

        // Not always accurate - String.getBytes().length() would be more accurate here, but slower.

        return ((String) value).length();

      default:

        return ESTIMATED_FIELD_SIZES.get(typeDescriptor.getTypeName());

    }

  }



  @Override

  public String toString() {

    String string = "Schema: " + schema + "  UUID: " + id + " delegateCoder: " + getDelegateCoder();

    return string;

  }

}